package drds.binlog.parse.inbound.mysql.rds;

import drds.binlog.common.position.EntryPosition;
import drds.binlog.common.position.LogPosition;
import drds.binlog.parse.EventParser;
import drds.binlog.parse.exception.BinlogParseException;
import drds.binlog.parse.exception.PositionNotFoundException;
import drds.binlog.parse.exception.ServerIdNotMatchException;
import drds.binlog.parse.inbound.Connection;
import drds.binlog.parse.inbound.ParserExceptionHandler;
import drds.binlog.parse.inbound.mysql.LocalBinLogConnection;
import drds.binlog.parse.inbound.mysql.LocalBinlogEventParser;
import drds.binlog.parse.inbound.mysql.rds.data.BinlogFile;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.springframework.util.Assert;

import java.io.File;
import java.util.Date;
import java.util.List;

/**
 * 基于rds binlog备份文件的复制
 */
public class RdsLocalBinlogEventParser extends LocalBinlogEventParser implements EventParser, LocalBinLogConnection.FileParserListener
{

    private String url;                // openapi地址
    private String accesskey;          // 云账号的ak
    private String secretkey;          // 云账号sk
    private String instanceId;         // rds实例id
    private Long startTime;
    private Long endTime;
    private BinlogDownloadQueue binlogDownloadQueue;
    private ParseFinishListener finishListener;
    private int batchFileSize;

    public RdsLocalBinlogEventParser()
    {
    }

    public void start() throws BinlogParseException
    {
        try
        {
            Assert.notNull(accesskey);
            Assert.notNull(secretkey);
            Assert.notNull(instanceId);
            Assert.notNull(url);
            Assert.notNull(directory);

            if (endTime == null)
            {
                endTime = System.currentTimeMillis();
            }

            EntryPosition entryPosition = findStartPosition(null);
            if (entryPosition == null)
            {
                throw new PositionNotFoundException("index$originIndex not found!");
            }
            long startTimeInMill = entryPosition.getTimestamp();
            startTime = startTimeInMill;
            List<BinlogFile> binlogFiles = RdsBinlogOpenApi.listBinlogFiles(url,
                    accesskey,
                    secretkey,
                    instanceId,
                    new Date(startTime),
                    new Date(endTime));
            binlogDownloadQueue = new BinlogDownloadQueue(binlogFiles, batchFileSize, directory);
            binlogDownloadQueue.silenceDownload();
            needWait = true;
            // try to download one file,use to test server id
            binlogDownloadQueue.tryOne();
        } catch (Throwable e)
        {
            logger.error("download binlog failed", e);
            throw new BinlogParseException(e);
        }
        setParserExceptionHandler(new ParserExceptionHandler()
        {

            @Override
            public void handle(Throwable e)
            {
                handleMysqlParserException(e);
            }
        });
        super.start();
    }

    private void handleMysqlParserException(Throwable throwable)
    {
        if (throwable instanceof ServerIdNotMatchException)
        {
            logger.error("server id not match, try download another rds binlog!");
            binlogDownloadQueue.notifyNotMatch();
            try
            {
                binlogDownloadQueue.cleanDir();
                binlogDownloadQueue.tryOne();
                binlogDownloadQueue.prepare();
            } catch (Throwable e)
            {
                throw new RuntimeException(e);
            }

            try
            {
                binlogDownloadQueue.execute(new Runnable()
                {

                    @Override
                    public void run()
                    {
                        RdsLocalBinlogEventParser.super.stop();
                        RdsLocalBinlogEventParser.super.start();
                    }
                });
            } catch (InterruptedException e)
            {
                throw new RuntimeException(e);
            }

        }
    }

    @Override
    protected Connection buildErosaConnection()
    {
        Connection connection = super.buildErosaConnection();
        if (connection instanceof LocalBinLogConnection)
        {
            LocalBinLogConnection localBinLogConnection = (LocalBinLogConnection) connection;
            localBinLogConnection.setNeedWait(true);
            localBinLogConnection.setServerId(serverId);
            localBinLogConnection.setFileParserListener(this);
        }
        return connection;
    }

    public String getUrl()
    {
        return url;
    }

    public void setUrl(String url)
    {
        if (StringUtils.isNotEmpty(url))
        {
            this.url = url;
        }
    }

    public void setAccesskey(String accesskey)
    {
        this.accesskey = accesskey;
    }

    public void setSecretkey(String secretkey)
    {
        this.secretkey = secretkey;
    }

    public void setInstanceId(String instanceId)
    {
        this.instanceId = instanceId;
    }

    public void setStartTime(Long startTime)
    {
        this.startTime = startTime;
    }

    public void setEndTime(Long endTime)
    {
        this.endTime = endTime;
    }

    @Override
    public void onFinish(String fileName)
    {
        try
        {
            binlogDownloadQueue.downOne();
            File needDeleteFile = new File(directory + File.separator + fileName);
            if (needDeleteFile.exists())
            {
                needDeleteFile.delete();
            }
            // 处理下logManager位点问题
            LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
            Long timestamp = 0L;
            if (logPosition != null && logPosition.getEntryPosition() != null)
            {
                timestamp = logPosition.getEntryPosition().getTimestamp();
                EntryPosition position = logPosition.getEntryPosition();
                LogPosition newLogPosition = new LogPosition();
                String journalName = position.getJournalName();
                int sepIdx = journalName.indexOf(".");
                String fileIndex = journalName.substring(sepIdx + 1);
                int index = NumberUtils.toInt(fileIndex) + 1;
                String newJournalName = journalName.substring(0, sepIdx) + "."
                        + StringUtils.leftPad(String.valueOf(index), fileIndex.length(), "0");
                newLogPosition.setEntryPosition(new EntryPosition(newJournalName,
                        4L,
                        position.getTimestamp(),
                        position.getServerId()));
                newLogPosition.setLogIdentity(logPosition.getLogIdentity());
                logPositionManager.persistLogPosition(destination, newLogPosition);
            }

            if (binlogDownloadQueue.isLastFile(fileName))
            {
                logger.warn("last file : " + fileName + " , timestamp : " + timestamp
                        + " , all file parse complete, switch to mysql parser!");
                finishListener.onFinish();
                return;
            } else
            {
                logger.warn("parse local binlog file : " + fileName + " , timestamp : " + timestamp
                        + " , try the next binlog !");
            }
            binlogDownloadQueue.prepare();
        } catch (Exception e)
        {
            logger.error("prepare download binlog file failed!", e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public void stop()
    {
        this.binlogDownloadQueue.release();
        super.stop();
    }

    public void setFinishListener(ParseFinishListener finishListener)
    {
        this.finishListener = finishListener;
    }

    public void setBatchFileSize(int batchFileSize)
    {
        this.batchFileSize = batchFileSize;
    }

    public interface ParseFinishListener
    {

        void onFinish();
    }
}
